import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class UDTAF {

    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    DataStream<Row> ds4 = env.addSource(new RichSourceFunction<Row>()
    {
        @Override
        public void run(SourceContext<Row> ctx) throws Exception
        {
            Row row1 = new Row(2);
            row1.setField(0,"a");
            row1.setField(1,1);

            Row row2 = new Row(2);
            row2.setField(0,"a");
            row2.setField(1,2);

            Row row3 = new Row(2);
            row3.setField(0,"a");
            row3.setField(1,100);

            ctx.collect(row1);
            ctx.collect(row2);
            ctx.collect(row3);
        }

        @Override
        public void cancel() {

        }
    }).returns(Types.ROW(Types.STRING, Types.INT));

            tEnv.createTemporaryView("t4",ds4,"name,cnt");

            tEnv.registerFunction("test4",new TestTableAggregateFunction());

    Table table4 = tEnv.sqlQuery("select * from t4");

    Table table5 = table4.groupBy("name")
            .flatAggregate("test4(cnt) as (v,rank)")
            .select("name,v,rank");

    DataStream<Tuple2<Boolean, Row>> res4 = tEnv.toRetractStream(table5, Row.class);

            res4.print().name("Aggregate Functions Print").setParallelism(1);


    env.execute();
    }
}